type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
+(*
+ separate capacity reservation for replies and watch events:
+ this allows a domain to keep working even when under a constant flood of
+ watch events
+*)
+type capacity = { maxoutstanding: int; maxwatchevents: int }
+
+module Queue = BoundedQueue
+
+type packet_class =
+ | CommandReply
+ | Watchevent
+
+let string_of_packet_class = function
+ | CommandReply -> "command_reply"
+ | Watchevent -> "watch_event"
+
type t =
{
backend: backend;
- pkt_out: Packet.t Queue.t;
+ pkt_out: (Packet.t, packet_class) Queue.t;
mutable partial_in: partial_buf;
mutable partial_out: string;
+ capacity: capacity
}
+let to_read con =
+ match con.partial_in with
+ | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
+ | NoHdr (i, _) -> i
+
+let debug t =
+ Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d bytes, pkt_out: %d packets, %s"
+ (to_read t)
+ (String.length t.partial_out)
+ (Queue.length t.pkt_out)
+ (BoundedQueue.debug string_of_packet_class t.pkt_out)
+
let init_partial_in () = NoHdr
(Partial.header_size (), Bytes.make (Partial.header_size()) '\000')
let s = if String.length con.partial_out > 0 then
con.partial_out
else if Queue.length con.pkt_out > 0 then
- Packet.to_string (Queue.pop con.pkt_out)
+ let pkt = Queue.pop con.pkt_out in
+ Packet.to_string pkt
else
"" in
(* send data from s, and save the unsent data to partial_out *)
(* after sending one packet, partial is empty *)
con.partial_out = ""
+(* we can only process an input packet if we're guaranteed to have room
+ to store the response packet *)
+let can_input con = Queue.can_push con.pkt_out CommandReply
+
(* NB: can throw Reconnect *)
let input con =
- let to_read =
- match con.partial_in with
- | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
- | NoHdr (i, _) -> i in
+ if not (can_input con) then None
+ else
+ let to_read = to_read con in
(* try to get more data from input stream *)
let b = Bytes.make to_read '\000' in
None
)
-let newcon backend = {
+let classify t =
+ match t.Packet.ty with
+ | Op.Watchevent -> Watchevent
+ | _ -> CommandReply
+
+let newcon ~capacity backend =
+ let limit = function
+ | CommandReply -> capacity.maxoutstanding
+ | Watchevent -> capacity.maxwatchevents
+ in
+ {
backend = backend;
- pkt_out = Queue.create ();
+ pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + capacity.maxwatchevents) ~classify ~limit;
partial_in = init_partial_in ();
partial_out = "";
+ capacity = capacity;
}
let open_fd fd = newcon (Fd { fd = fd; })
type backend_fd = { fd : Unix.file_descr; }
type backend = Fd of backend_fd | Xenmmap of backend_mmap
type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
+type capacity = { maxoutstanding: int; maxwatchevents: int }
type t
val init_partial_in : unit -> partial_buf
val reconnect : t -> unit
-val queue : t -> Packet.t -> unit
+val queue : t -> Packet.t -> unit option
val read_fd : backend_fd -> 'a -> bytes -> int -> int
val read_mmap : backend_mmap -> 'a -> bytes -> int -> int
val read : t -> bytes -> int -> int
val write : t -> string -> int -> int
val output : t -> bool
val input : t -> Packet.t option
-val newcon : backend -> t
-val open_fd : Unix.file_descr -> t
-val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t
+val newcon : capacity:capacity -> backend -> t
+val open_fd : Unix.file_descr -> capacity:capacity -> t
+val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity -> t
val close : t -> unit
val is_fd : t -> bool
val is_mmap : t -> bool
val output_len : t -> int
+val can_input: t -> bool
val has_new_output : t -> bool
val has_old_output : t -> bool
val has_output : t -> bool
val has_more_input : t -> bool
val is_selectable : t -> bool
val get_fd : t -> Unix.file_descr
+val debug: t -> string
open Xenbus
let data_concat ls = (String.concat "\000" ls) ^ "\000"
+let queue con pkt = let r = Xb.queue con pkt in assert (r <> None)
let queue_path ty (tid: int) (path: string) con =
let data = data_concat [ path; ] in
- Xb.queue con (Xb.Packet.create tid 0 ty data)
+ queue con (Xb.Packet.create tid 0 ty data)
(* operations *)
let directory tid path con = queue_path Xb.Op.Directory tid path con
let getperms tid path con = queue_path Xb.Op.Getperms tid path con
let debug commands con =
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
+ queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
let watch path data con =
let data = data_concat [ path; data; ] in
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
+ queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
let unwatch path data con =
let data = data_concat [ path; data; ] in
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
+ queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
let transaction_start con =
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
+ queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
let transaction_end tid commit con =
let data = data_concat [ (if commit then "T" else "F"); ] in
- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
+ queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
let introduce domid mfn port con =
let data = data_concat [ Printf.sprintf "%u" domid;
Printf.sprintf "%nu" mfn;
string_of_int port; ] in
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
+ queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
let release domid con =
let data = data_concat [ Printf.sprintf "%u" domid; ] in
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
+ queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
let resume domid con =
let data = data_concat [ Printf.sprintf "%u" domid; ] in
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
+ queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
let getdomainpath domid con =
let data = data_concat [ Printf.sprintf "%u" domid; ] in
- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
+ queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
let write tid path value con =
let data = path ^ "\000" ^ value (* no NULL at the end *) in
- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
+ queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con
let rm tid path con = queue_path Xb.Op.Rm tid path con
let setperms tid path perms con =
let data = data_concat [ path; perms ] in
- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
+ queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
let close con =
Xb.close con.xb
+let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; }
+
let open_fd fd = {
- xb = Xb.open_fd fd;
+ xb = Xb.open_fd ~capacity fd;
watchevents = Queue.create ();
}
let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *)
+type 'a bounded_sender = 'a -> unit option
+(** a bounded sender accepts an ['a] item and returns:
+ None - if there is no room to accept the item
+ Some () - if it has successfully accepted/sent the item
+ *)
+
+module BoundedPipe : sig
+ type 'a t
+
+ (** [create ~capacity ~destination] creates a bounded pipe with a
+ local buffer holding at most [capacity] items. Once the buffer is
+ full it will not accept further items. items from the pipe are
+ flushed into [destination] as long as it accepts items. The
+ destination could be another pipe.
+ *)
+ val create: capacity:int -> destination:'a bounded_sender -> 'a t
+
+ (** [is_empty t] returns whether the local buffer of [t] is empty. *)
+ val is_empty : _ t -> bool
+
+ (** [length t] the number of items in the internal buffer *)
+ val length: _ t -> int
+
+ (** [flush_pipe t] sends as many items from the local buffer as possible,
+ which could be none. *)
+ val flush_pipe: _ t -> unit
+
+ (** [push t item] tries to [flush_pipe] and then push [item]
+ into the pipe if its [capacity] allows.
+ Returns [None] if there is no more room
+ *)
+ val push : 'a t -> 'a bounded_sender
+end = struct
+ (* items are enqueued in [q], and then flushed to [connect_to] *)
+ type 'a t =
+ { q: 'a Queue.t
+ ; destination: 'a bounded_sender
+ ; capacity: int
+ }
+
+ let create ~capacity ~destination =
+ { q = Queue.create (); capacity; destination }
+
+ let rec flush_pipe t =
+ if not Queue.(is_empty t.q) then
+ let item = Queue.peek t.q in
+ match t.destination item with
+ | None -> () (* no room *)
+ | Some () ->
+ (* successfully sent item to next stage *)
+ let _ = Queue.pop t.q in
+ (* continue trying to send more items *)
+ flush_pipe t
+
+ let push t item =
+ (* first try to flush as many items from this pipe as possible to make room,
+ it is important to do this first to preserve the order of the items
+ *)
+ flush_pipe t;
+ if Queue.length t.q < t.capacity then begin
+ (* enqueue, instead of sending directly.
+ this ensures that [out] sees the items in the same order as we receive them
+ *)
+ Queue.push item t.q;
+ Some (flush_pipe t)
+ end else None
+
+ let is_empty t = Queue.is_empty t.q
+ let length t = Queue.length t.q
+end
+
type watch = {
con: t;
token: string;
path: string;
base: string;
is_relative: bool;
+ pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t;
}
and t = {
anonid: int;
mutable stat_nb_ops: int;
mutable perm: Perms.Connection.t;
+ pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t
}
+module Watch = struct
+ module T = struct
+ type t = watch
+
+ let compare w1 w2 =
+ (* cannot compare watches from different connections *)
+ assert (w1.con == w2.con);
+ match String.compare w1.token w2.token with
+ | 0 -> String.compare w1.path w2.path
+ | n -> n
+ end
+ module Set = Set.Make(T)
+
+ let flush_events t =
+ BoundedPipe.flush_pipe t.pending_watchevents;
+ not (BoundedPipe.is_empty t.pending_watchevents)
+
+ let pending_watchevents t =
+ BoundedPipe.length t.pending_watchevents
+end
+
+let source_flush_watchevents t =
+ BoundedPipe.flush_pipe t.pending_source_watchevents
+
+let source_pending_watchevents t =
+ BoundedPipe.length t.pending_source_watchevents
+
let mark_as_bad con =
match con.dom with
|None -> ()
token = token;
path = path;
base = get_path con;
- is_relative = path.[0] <> '/' && path.[0] <> '@'
+ is_relative = path.[0] <> '/' && path.[0] <> '@';
+ pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb)
}
let get_con w = w.con
Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid
let create xbcon dom =
+ let destination (watch, pkt) =
+ BoundedPipe.push watch.pending_watchevents pkt
+ in
let id =
match dom with
| None -> let old = !anon_id_next in incr anon_id_next; old
anonid = id;
stat_nb_ops = 0;
perm = make_perm dom;
+
+ (* the actual capacity will be lower, this is used as an overflow
+ buffer: anything that doesn't fit elsewhere gets put here, only
+ limited by the amount of watches that you can generate with a
+ single xenstore command (which is finite, although possibly very
+ large in theory for Dom0). Once the pipe here has any contents the
+ domain is blocked from sending more commands until it is empty
+ again though.
+ *)
+ pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination
}
in
Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con);
let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb
-let send_reply con tid rid ty data =
+let packet_of con tid rid ty data =
if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then
- Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000")
+ Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000"
else
- Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data)
+ Xenbus.Xb.Packet.create tid rid ty data
+
+let send_reply con tid rid ty data =
+ let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in
+ (* should never happen: we only process an input packet when there is room for an output packet *)
+ (* and the limit for replies is different from the limit for watch events *)
+ assert (result <> None)
let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000")
let send_ack con tid rid ty = send_reply con tid rid ty "OK\000"
apath, w
let del_watches con =
- Hashtbl.clear con.watches;
+ Hashtbl.reset con.watches;
con.nb_watches <- 0
let del_transactions con =
- Hashtbl.clear con.transactions
+ Hashtbl.reset con.transactions
let list_watches con =
let ll = Hashtbl.fold
let lookup_watch_perms oldroot root path =
lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root)
-let fire_single_watch_unchecked watch =
+let fire_single_watch_unchecked source watch =
let data = Utils.join_by_null [watch.path; watch.token; ""] in
- send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data
+ let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in
+
+ match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with
+ | Some () -> () (* packet queued *)
+ | None ->
+ (* a well behaved Dom0 shouldn't be able to trigger this,
+ if it happens it is likely a Dom0 bug causing runaway memory usage
+ *)
+ failwith "watch event overflow, cannot happen"
-let fire_single_watch (oldroot, root) watch =
+let fire_single_watch source (oldroot, root) watch =
let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in
let perms = lookup_watch_perms oldroot root abspath in
if Perms.can_fire_watch watch.con.perm perms then
- fire_single_watch_unchecked watch
+ fire_single_watch_unchecked source watch
else
let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in
let con = get_domstr watch.con in
Logging.watch_not_fired ~con perms (Store.Path.to_string abspath)
-let fire_watch roots watch path =
+let fire_watch source roots watch path =
let new_path =
if watch.is_relative && path.[0] = '/'
then begin
end else
path
in
- fire_single_watch roots { watch with path = new_path }
+ fire_single_watch source roots { watch with path = new_path }
(* Search for a valid unused transaction id. *)
let rec valid_transaction_id con proposed_id =
let has_partial_input con = Xenbus.Xb.has_partial_input con.xb
let has_more_input con = Xenbus.Xb.has_more_input con.xb
+let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents
let has_output con = Xenbus.Xb.has_output con.xb
let has_old_output con = Xenbus.Xb.has_old_output con.xb
let has_new_output con = Xenbus.Xb.has_new_output con.xb
&& (has_extra_connection_data con || has_transaction_data con)
let has_more_work con =
- has_more_input con || not (has_old_output con) && has_new_output con
+ (has_more_input con && can_input con) || not (has_old_output con) && has_new_output con
let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1
domains: (int, Connection.t) Hashtbl.t;
ports: (Xeneventchn.t, Connection.t) Hashtbl.t;
mutable watches: Connection.watch list Trie.t;
+ mutable has_pending_watchevents: Connection.Watch.Set.t
}
let create () = {
anonymous = Hashtbl.create 37;
domains = Hashtbl.create 37;
ports = Hashtbl.create 37;
- watches = Trie.create ()
+ watches = Trie.create ();
+ has_pending_watchevents = Connection.Watch.Set.empty;
}
+let get_capacity () =
+ (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *)
+ { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents }
+
let add_anonymous cons fd =
- let xbcon = Xenbus.Xb.open_fd fd in
+ let capacity = get_capacity () in
+ let xbcon = Xenbus.Xb.open_fd fd ~capacity in
let con = Connection.create xbcon None in
Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con
let add_domain cons dom =
- let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> Domain.notify dom) in
+ let capacity = get_capacity () in
+ let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in
let con = Connection.create xbcon (Some dom) in
Hashtbl.add cons.domains (Domain.get_id dom) con;
match Domain.get_port dom with
Hashtbl.fold (fun _ con (ins, outs) ->
if (only_if con) then (
let fd = Connection.get_fd con in
- (fd :: ins, if Connection.has_output con then fd :: outs else outs)
+ let in_fds = if Connection.can_input con then fd :: ins else ins in
+ let out_fds = if Connection.has_output con then fd :: outs else outs in
+ in_fds, out_fds
) else (ins, outs)
)
cons.anonymous ([], [])
| [] -> None
| ws -> Some ws
+let del_watches cons con =
+ Connection.del_watches con;
+ cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+ cons.has_pending_watchevents <-
+ cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w ->
+ Connection.get_con w != con
+
let del_anonymous cons con =
try
Hashtbl.remove cons.anonymous (Connection.get_fd con);
- cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+ del_watches cons con;
Connection.close con
with exn ->
debug "del anonymous %s" (Printexc.to_string exn)
| Some p -> Hashtbl.remove cons.ports p
| None -> ())
| None -> ());
- cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+ del_watches cons con;
Connection.close con
with exn ->
debug "del domain %u: %s" id (Printexc.to_string exn)
cons.watches <- Trie.set cons.watches key watches;
watch
-let del_watches cons con =
- Connection.del_watches con;
- cons.watches <- Trie.map (del_watches_of_con con) cons.watches
-
(* path is absolute *)
-let fire_watches ?oldroot root cons path recurse =
+let fire_watches ?oldroot source root cons path recurse =
let key = key_of_path path in
let path = Store.Path.to_string path in
let roots = oldroot, root in
let fire_watch _ = function
| None -> ()
- | Some watches -> List.iter (fun w -> Connection.fire_watch roots w path) watches
+ | Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches
in
let fire_rec _x = function
| None -> ()
| Some watches ->
- List.iter (Connection.fire_single_watch roots) watches
+ List.iter (Connection.fire_single_watch source roots) watches
in
Trie.iter_path fire_watch cons.watches key;
if recurse then
Trie.iter fire_rec (Trie.sub cons.watches key)
+let send_watchevents cons con =
+ cons.has_pending_watchevents <-
+ cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events;
+ Connection.source_flush_watchevents con
+
let fire_spec_watches root cons specpath =
+ let source = find_domain cons 0 in
iter cons (fun con ->
- List.iter (Connection.fire_single_watch (None, root)) (Connection.get_watches con specpath))
+ List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath))
let set_target cons domain target_domain =
let con = find_domain cons domain in
let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in
String.concat "" (domains @ anonymous)
+let debug_watchevents cons con =
+ (* == (physical equality)
+ has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular
+ comparison to fail due to having a 'functional value' which cannot be compared.
+ *)
+ let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in
+ let pending = s |> Connection.Watch.Set.elements
+ |> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in
+ Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending
+
let filter ~f cons =
let fold _ v acc = if f v then v :: acc else acc in
[]
let maxwatch = ref (100)
let maxtransaction = ref (10)
let maxrequests = ref (1024) (* maximum requests per transaction *)
+let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. in-flight requests / domain *)
+let maxwatchevents = ref (1024)
+(*
+ maximum outstanding watch events per watch,
+ recommended >= maxoutstanding to avoid blocking backend transactions due to
+ malicious frontends
+ *)
let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *)
let conflict_burst_limit = ref 5.0
quota-transaction = 10
quota-maxrequests = 1024
quota-path-max = 1024
+quota-maxoutstanding = 1024
+quota-maxwatchevents = 1024
# Activate filed base backend
persistent = false
| path :: "" :: [] -> Store.Path.create path (Connection.get_path con)
| _ -> raise Invalid_Cmd_Args
-let process_watch t cons =
+let process_watch source t cons =
let oldroot = t.Transaction.oldroot in
let newroot = Store.get_root t.Transaction.store in
let ops = Transaction.get_paths t |> List.rev in
| Xenbus.Xb.Op.Rm -> true, None, oldroot
| Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot
| _ -> raise (Failure "huh ?") in
- Connections.fire_watches ?oldroot root cons (snd op) recurse in
- List.iter (fun op -> do_op_watch op cons) ops
+ Connections.fire_watches ?oldroot source root cons (snd op) recurse in
+ List.iter (fun op -> do_op_watch op cons) ops;
+ Connections.send_watchevents cons source
let create_implicit_path t perm path =
let dirname = Store.Path.get_parent path in
| "watches" :: _ ->
let watches = Connections.debug cons in
Some (watches ^ "\000")
+ | "xenbus" :: domid :: _ ->
+ let domid = int_of_string domid in
+ let con = Connections.find_domain cons domid in
+ let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, has_more_work: %b. pending: %s"
+ (Xenbus.Xb.debug con.xb)
+ (Connection.source_pending_watchevents con)
+ (Connection.can_input con)
+ (Connection.has_more_input con)
+ (Connection.has_old_output con)
+ (Connection.has_new_output con)
+ (Connection.has_more_work con)
+ (Connections.debug_watchevents cons con)
+ in
+ Some s
| "mfn" :: domid :: _ ->
let domid = int_of_string domid in
let con = Connections.find_domain cons domid in
fct con t doms cons data;
Packet.Ack (fun () ->
if Transaction.get_id t = Transaction.none then
- process_watch t cons
+ process_watch con t cons
)
let reply_data fct con t doms cons data =
Packet.Ack (fun () ->
(* xenstore.txt says this watch is fired immediately,
implying even if path doesn't exist or is unreadable *)
- Connection.fire_single_watch_unchecked watch)
+ Connection.fire_single_watch_unchecked con watch)
let do_unwatch con _t _domains cons data =
let (node, token) =
if not success then
raise Transaction_again;
if commit then begin
- process_watch t cons;
+ process_watch con t cons;
match t.Transaction.ty with
| Transaction.No ->
() (* no need to record anything *)
let do_input store cons doms con =
let newpacket =
try
- Connection.do_input con
+ if Connection.can_input con then Connection.do_input con
+ else None
with Xenbus.Xb.Reconnect ->
info "%s requests a reconnect" (Connection.get_domstr con);
History.reconnect con;
Connection.incr_ops con
let do_output _store _cons _doms con =
+ Connection.source_flush_watchevents con;
if Connection.has_output con then (
if Connection.has_new_output con then (
let packet = Connection.peek_output con in
("quota-maxentity", Config.Set_int Quota.maxent);
("quota-maxsize", Config.Set_int Quota.maxsize);
("quota-maxrequests", Config.Set_int Define.maxrequests);
+ ("quota-maxoutstanding", Config.Set_int Define.maxoutstanding);
+ ("quota-maxwatchevents", Config.Set_int Define.maxwatchevents);
("quota-path-max", Config.Set_int Define.path_max);
("gc-max-overhead", Config.Set_int Define.gc_max_overhead);
("test-eagain", Config.Set_bool Transaction.test_eagain);